package cAmqp

import (
	"time"

	"github.com/gin-gonic/gin"
	"github.com/rabbitmq/amqp091-go"

	"gitee.com/csingo/cLog"
)

func basicConsume(ctx *gin.Context, channel *amqp091.Channel, driver *AmqpConf_Driver, option ConsumeOption) (err error) {
	var start bool  // 是否开始 循环等待次数 计数
	var count int64 // 循环等待次数
	var msgs []amqp091.Delivery
	var deliveries <-chan amqp091.Delivery

	defer func() {
		if len(msgs) > 0 {
			runMessageHandlerFunc(ctx, &msgs, option.AutoAck, driver, option.Handler)
		}
	}()

	deliveries, err = channel.Consume(
		driver.QueueName,
		option.Tag,
		option.AutoAck,
		option.Exclusive,
		option.NoLocal,
		option.NoWait,
		option.Arguments,
	)
	if err != nil {
		return
	}

	// 消费消息
	for {
		if channel.IsClosed() {
			return AmqpChannelCloseErr
		}
		if option.PauseHandler != nil && option.PauseHandler(ctx, option.Params) {
			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.basicConsume",
				"driver": driver,
			}).Tracef("AMQP 消费者队列暂停消费(ServerPush)，等待 %d 秒后继续", option.WaitTime)
			time.Sleep(time.Duration(option.WaitTime) * time.Second)
			continue
		}
		select {
		case msg := <-deliveries:
			start = true
			count = 0
			msgs = append(msgs, msg)
			// 批量处理
			if int64(len(msgs)) >= option.BatchSize {
				runMessageHandlerFunc(ctx, &msgs, option.AutoAck, driver, option.Handler)
			}
		default:
			if start {
				count++
			}

			if count <= option.WaitCount+1 {
				cLog.WithContext(ctx, map[string]any{
					"source": "cAmqp.basicConsume",
					"driver": driver,
				}).Tracef("AMQP 消费者队列暂无数据(ServerPush)，等待 %d 秒后继续", option.WaitTime)
				time.Sleep(time.Duration(option.WaitTime) * time.Second)
				continue
			}

			if len(msgs) > 0 {
				cLog.WithContext(ctx, map[string]any{
					"source": "cAmqp.basicConsume",
					"driver": driver,
				}).Tracef("AMQP 消费者队列暂无数据(ServerPush)，清空缓存数据并等待 %d 秒后继续", option.WaitTime)
				count = 0
				runMessageHandlerFunc(ctx, &msgs, option.AutoAck, driver, option.Handler)
				continue
			}

			// 删除队列
			if option.Queue.AutoDeleted {
				cLog.WithContext(ctx, map[string]any{
					"source": "cAmqp.basicConsume",
					"driver": driver,
				}).Trace("AMQP 消费者退出(ServerPush)")
				return
			}

			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.basicConsume",
				"driver": driver,
			}).Tracef("AMQP 消费者队列暂无数据(ServerPush)，等待 %d 秒后继续", option.WaitTime)
			time.Sleep(time.Duration(option.WaitTime) * time.Second)
		}
	}
}
